package org.databandtech.mockmq;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.databandtech.mockmq.entity.AdMonitor;
import com.google.gson.Gson;

public class AdKafkaLog {
	
	final static String HOST="192.168.13.52:9092";//"192.168.10.60:9092"
	final static String TOPIC="ADMONITOR";
	final static int COUNT=10;    //发送的数据条数
	final static int PARTITION=0; //分区
	
	final static String[] OS = {"android","ios"};
	final static String[] CITYS = {"北京","北京","北京","上海","上海","上海","广州","广州","深圳","深圳","重庆","杭州","武汉","南京","郑州","西安","成都","长沙"};
	final static String[] APPS = {"腾讯","爱奇艺","优酷"};

	public static void main(String[] args) {

		Properties properties = new Properties();
		properties.put("bootstrap.servers", HOST);
		// 0:producer不会等待broker发送ack
        // 1:当leader接收到消息后发送ack
        // -1:当所有的follower都同步消息成功后发送ack
		properties.put("acks", "-1");
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

		@SuppressWarnings("resource")
		org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
		// -- 同步发送消息
		for (int i = 1; i <= COUNT; i++) {
			RecordMetadata metadata = null;
		    //参数1：topic名, 参数2：消息文本； ProducerRecord多个重载的构造方法
			AdMonitor ad = new AdMonitor();
			ad.setAdid(Mock.getNumString(8));
			ad.setAdmid("Ad-"+Mock.getNumString(20));
			ad.setAppname(APPS[Mock.getNum(0, APPS.length-1)]);
			ad.setCitycode(CITYS[Mock.getNum(0, CITYS.length-1)]);
			ad.setIp("0.0.0.0");
			ad.setMac1("B2-1A-3A-40-4B-0B");
			ad.setOs(OS[Mock.getNum(0, OS.length-1)]);
			LocalDateTime localDateTime = LocalDateTime.now();
			ad.setTs(localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
			ad.setUid(Mock.getCnName());	

			//指定分区发送
			//ProducerRecord<String, String> pr = new ProducerRecord<String, String>(TOPIC,PARTITION,"key"+i, msg +"--"+i);
			//发送到默认分区
			String jsonStr = new Gson().toJson(ad);
			ProducerRecord<String, String> pr = new ProducerRecord<String, String>(TOPIC, jsonStr);
			try {
				metadata = kafkaProducer.send(pr).get();
				System.out.println("TopicName : " + metadata.topic() + " Partiton : " + metadata
	                    .partition() + " Offset : " + metadata.offset()+"--"+jsonStr+i);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (ExecutionException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}		    
		}
	}

}
